/opt/SupR/1.1on a few machines. No download is necessary.
$ cd ~/Downloads
$ wget --user username --password passward http://www.stat.purdue.edu/~chuanhai/SupR/internal/SupR-1.1.0-ubuntu-16.04.tar
$ cd /opt
$ sudo tar -xvf ~/Downloads/SupR-1.1.0-ubuntu-16.04.tar
export SUPR_HOME='/opt/SupR/1.1'
export SUPR_MASTER_HOST='localhost'
export SUPR_MASTER_PORT=5118
export SUPR_GLOBAL_DIR=$HOME'/.SupR'
setenv SUPR_HOME '/opt/SupR/1.1'
setenv SUPR_MASTER_HOST 'localhost'
setenv SUPR_MASTER_PORT 5118
setenv SUPR_GLOBAL_DIR $HOME'/.SupR'
$ $SUPR_HOME/bin/R
R version 3.1.1 (2014-07-10) -- "Sock it to Me"
Copyright (C) 2014 The R Foundation for Statistical Computing
Platform: x86_64-unknown-linux-gnu (64-bit)
R is free software and comes with ABSOLUTELY NO WARRANTY.
You are welcome to redistribute it under certain conditions.
Type 'license()' or 'licence()' for distribution details.
Natural language support but running in an English locale
R is a collaborative project with many contributors.
Type 'contributors()' for more information and
'citation()' on how to cite R or R packages in publications.
Type 'demo()' for some demos, 'help()' for on-line help, or
'help.start()' for an HTML browser interface to help.
Type 'q()' to quit R.
___________________________________________________________
SupR Version 1.0.0. (Internal Release Version 01)
> 1024(1024)
[1] 1048576
> implicit()
$matrix
[1] "%*%"
$numeric
[1] "*"
$integer
[1] "*"
> iter = as.iterator(1:5)
> while(has.next(iter)) print(get.next(iter))
[1] 1
[1] 2
[1] 3
[1] 4
[1] 5
> new.thread(1+2, start=TRUE)
...
> options(info.enabled=FALSE)
> options(info.enabled=TRUE)
> options(debug.enabled=TRUE)
> options(debug.enabled=FALSE)
$ $SUPR_HOME/bin/R "-e start.master()"
...
SupR Version 1.0.0. (Internal Release Version 01)
> start.master()
...
[Master[29717,29717]] thread BlockManagerMaster_1 launched at Fri Mar 25 11:26:23 2016
[Master[29717,29717]] Master started: xxxx.xxxx.xxxx.xxx:5008
...
$ $SUPR_HOME/bin/R "-e start.worker()"
...
> start.worker()
...
[BlockManager_1[29896,29899]] thread BlockTransferService_2 launched at Fri Mar 25 11:30:24 2016
...
[Executor_3[29896,29901]] Set local directory, /home/skew/u5/chuanhai/.SupR/__LOCAL__/xxxx.xxxx.xxxx.xxx/worker-0000
...
[Heartbeat_localhost:5008[29896,29903]] heartbeat is running
start.worker(conf = pairlist(nexecutors=n))
$ $SUPR_HOME/bin/R
...
SupR Version 1.0.0. (Internal Release Version 01)
...
> start.driver()
...
[BlockManager_1[30743,30745]] thread BlockTransferService_2 launched at Fri Mar 25 11:37:28 2016
...
[Driver[30743,30743]] Set local directory, /home/skew/u5/chuanhai/.SupR/__LOCAL__/somehostname/driver
...
# show cluster connections. the function name is subject to change ...
> showClusterConnections()
$main
TYPE host port local_port PID nCpu n_tasks block_manager fd alive
1 MASTER localhost 5008 5008 29717 0 0 7 5.996109
2 WORKER xxx.xxx.xxx 8801 8801 29896 8 0 xxx.xxx.xxx:6009 15 2.523262
3 BMNGER localhost 6010 6010 30743 0 0 14 5.828085
comment
1 registered
2 registered
3 registered
$block_manager
TYPE host port local_port PID nCpu n_tasks block_manager fd
1 BMNGER xxx.xxx.xxx 6010 6010 1 8 0 -1
2 BM_MASTER xxx.xxx.xxx 6008 6008 29717 0 0 11
3 DRIVER localhost NA 9383 0 8 0 xxx.xxx.xxx:6010 12
alive comment
1 5.959262 socket_server
2 5.957455 registered
3 5.826343
> # test some functions under refinement
> options(info.enabled=FALSE)
> options(info.enabled=TRUE)
> options(debug.enabled=TRUE)
> options(debug.enabled=FALSE)
>
> ? thread
> ? cluster
> ? dfs
> ? supr.supp
########################
# Linear Regression(1) #
########################
# simulate data from Y = X beta + sigma rnorm(m)
#
# returns an iterator that produces n subsample of size m
#
lr.subsets = function(n, m, p, beta, sigma = 1)
{
XY = NULL;
has.next = function() {
if(!is.null(XY)) return(TRUE)
if(n == 0) return(FALSE)
# decrease n by one and create one sunset
n <<- n - 1
X <- matrix(c(rep(1, m), rnorm(m*p)), nrow = m)
Y <- X(beta) + sigma(rnorm(m))
XY <<- cbind(X, Y)
return(TRUE)
}
get.next = function(){
if(is.null(XY)) stop("cannot find the next value")
xy <- XY
XY <<- NULL
return(xy)
}
return(iterator(has.next, get.next))
}
# The sweep operator -- make it an internal function later ???
sweep = function (S, K) {
det = attr(S, "det")
ln.det = if(is.null(det)) 0.0 else log(det)
for(k in K){
flag = sign(k)
k = abs(k)
a=S[k,k]
ln.det = ln.det + log(flag*a)
S[k,k]= -1/a
S[-k,-k]=S[-k,-k]-S[-k,k,drop=F]%*%S[k,-k,drop=F]/a
a = a*flag
S[k,-k]=S[k,-k]/a
S[-k,k]=S[-k,k]/a
}
structure(S, det = exp(ln.det))
}
#
############################
#/*Linear Regression (1)*/ #
############################
# dfs.rm("cluster://lr", TRUE) # FIXME
p = 4
beta = rnorm(1+p)
sigma = 0.1
iter = lr.subsets(100, 10(1000), p=p, beta, sigma=sigma)
distribute(iter, name="lr", np=1, nrep=2)
# dfs.ls("cluster://lr")
> # map.reduce(lr, class(.[[1]]), c)
> # map.reduce(//lr, function(x) class(x[[1]]), c)
> map.reduce(lr, function(x) class(x[[1]]), c)
> # ss = map.reduce(lr, t(XY<-.[[1]])(XY), `+`)
> # ss = map.reduce(lr, function(x) t(XY<-x[[1]])(XY), `+`)
> ss = map.reduce(//lr, function(x) t(XY<-x[[1]])(XY), `+`)
N = as.integer(ss[1,1])
P = dim(ss)[1]-1
H = sweep(ss, 1:P)
beta.hat = H[P+1,-(P+1)]
sigma.hat= sqrt(H[P+1,(P+1)]/(N-P))
# Checking
data.frame(beta.hat, beta)
c(sigma.hat, sigma)
simu = function(N = 10(1000), mis.frac = 0.25, mu=8.0)
{
X = rnorm(N) + mu
M = runif(N) < mis.frac
X[M] = NA # the observed data, from which the EM algorithm is to find MLE
X
}
# Split the incomplete data into np subsets, by making use of
# the function matrix
split = function(X, np)
{
N = length(X)
ncols = np
nrows = as.integer((N-1)/ncols)+1
matrix(c(X, rep(NA, nrows*ncols-N)), nrows, ncols, byrow=T)
}
data = simu(mis.frac=0.5)
NP = 4
subsets = data.frame(split(data, np = NP))
log.likelihood = function()
{
z = data[!is.na(data)] - MU
-sum(z*z)/2
}
Colors = rainbow(200)[1:100]
X11()
Colors = rainbow(200)[1:100]
MU = 0 # in .GlobalEnv = globalenv()
SS = list(sum.x=0, n=0, n.threads = NP)
sync.m = "another object"; sync.e = "one more object"
M.thread = new.thread(
env = list2env(list(max_iter = 100, NP=NP, mu=0)),
sync.eval(sync.m, {
ll = iter = numeric(0)
for(i in 1:max_iter){
cat("\n\033[0;32mM-step is ready ...\033[0m\n")
wait(sync.m) # print(SS); if(SS$n < 1000) {SS <<- NULL; stop("Whoops")}
MU <<- SS$sum.x/SS$n
SS <<- if(i == max_iter) NULL else list(sum.x=0, n=0, n.threads = NP)
sync.eval(sync.e, notify(sync.e, all=TRUE))
ll = c(ll,log.likelihood()); iter = c(iter,i)
cat(current.thread(),"\033[0;34mM-Step: iteration", i, MU, "\033[0m\n")
plot(iter, ll-min(ll), pch=16, col=Colors[iter],
xlab="iteration", ylab="log.likelihood")
if(runif(1) < .25) {ll = ll[-1]; iter = iter[-1]}
} }), start=TRUE)
E.threads = as.list(1:NP)
for(i in 1:length(E.threads)) E.threads[[i]] = new.thread(
env=list2env(list(X=subsets[[i]])),
{ mis = is.na(X)
while(!is.null(SS)) {
X[mis] = MU
ss = list(sum.x = sum(X), n = length(X))
sync.eval(sync.e, { SS <<- list(sum.x = SS$sum.x + ss$sum.x,
n = SS$n + ss$n, n.threads = SS$n.threads - 1)
cat("\033[0;33m",current.thread(), "\tn.threads =", SS$n.threads,
"\033[0m\n")
if(SS$n.threads==0) sync.eval(sync.m, notify(sync.m))
sync.eval(sync.e, wait(sync.e))
})
}
})
for(i in 1:length(E.threads)) start(E.threads[[i]])
join.thread(M.thread)
for(e in E.threads) print(join.thread(e))